package org.databandtech.flink.demo;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.databandtech.flink.flatmapfunc.Tokenizer;

/**
 * 在d:下新建一个文件：count.txt 
 * dog cat fish cat tiger dog man tiger cat tiger fish dog
 * dog cat
 * 
 * @author Administrator
 *
 */
public class CountByTextFile {

	public static void main(String[] args) {

		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet<String> dataSet = env.readTextFile("d://count.txt");
		//AggregateOperator 继承自DataSet<Tuple2<String, Integer>>
		AggregateOperator<Tuple2<String, Integer>> counts = dataSet
				.flatMap(new Tokenizer(" "))
				.groupBy(0)
				.sum(1);
		try {
			counts.print();
		} catch (Exception e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

		String outputPath = "d://out-count.txt";
		counts.writeAsCsv(outputPath, "\n", " ");

		try {
			env.execute("readTextFile flink");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
